package github.saw96x.remoting.transport.netty.client;

import github.saw96x.common.enums.CompressTypeEnum;
import github.saw96x.common.enums.SerializationTypeEnum;
import github.saw96x.common.factory.SingletonFactory;
import github.saw96x.remoting.constants.RpcConstants;
import github.saw96x.remoting.pojo.RpcMessage;
import github.saw96x.remoting.pojo.RpcResponse;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;

/**
 * @author Saw96x
 * @date 2022/3/18 9:35
 */
@Slf4j
public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {
  private final UnprocessedRequests unprocessedRequests;
  private final NettyRpcClient nettyRpcClient;

  public NettyRpcClientHandler() {
    this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);
    this.nettyRpcClient = SingletonFactory.getInstance(NettyRpcClient.class);
  }

  /**
   * Read the message transmitted by the server
   */
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) {
    try {
      log.info("client receive msg: [{}]", msg);
      if (msg instanceof RpcMessage) {
        RpcMessage tmp = (RpcMessage) msg;
        byte messageType = tmp.getMessageType();
        if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
          log.info("heart [{}]", tmp.getData());
        } else if (messageType == RpcConstants.RESPONSE_TYPE) {
          RpcResponse<Object> rpcResponse = (RpcResponse<Object>) tmp.getData();
          unprocessedRequests.complete(rpcResponse);
        }
      }
    } finally {
      ReferenceCountUtil.release(msg);
    }
  }

  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    if (evt instanceof IdleStateEvent) {
      IdleState state = ((IdleStateEvent) evt).state();
      if (state == IdleState.WRITER_IDLE) {
        log.info("write idle happen [{}]", ctx.channel().remoteAddress());
        Channel channel = nettyRpcClient.getChannel((InetSocketAddress) ctx.channel().remoteAddress());
        RpcMessage rpcMessage = new RpcMessage();
        rpcMessage.setCodec(SerializationTypeEnum.PROTOSTUFF.getCode());
        rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
        rpcMessage.setMessageType(RpcConstants.HEARTBEAT_REQUEST_TYPE);
        rpcMessage.setData(RpcConstants.PING);
        channel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
      }
    } else {
      super.userEventTriggered(ctx, evt);
    }
  }

  /**
   * Called when an exception occurs in processing a client message
   */
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    log.error("client catch exception：", cause);
    cause.printStackTrace();
    ctx.close();
  }

}

